package cog.support.services.thread;


import cog.support.services.thread.work.ThreadWork;
import cog.support.util.common.StringKit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 后台线程服务
 *  用于管理后台长时间运行线程
 *
 * @author 陈杰
 * @since 2017年1月8日 16:59:41
 * @version v0.1
 *
 * Copyright ChenJie(chenjie_java@aliyun.com)
 */
public class BackgroundThreadService{
    private Logger logger = LoggerFactory.getLogger(BackgroundThreadService.class);

    private SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddhhmmssSSS");

    /**
     * 运行标志
     * */
    private boolean runMark = true;

    /**
     * 后台服务名称
     * */
    private String backgroundServicesName;

    /**
     * 工作线程集合
     * */
    private List<ThreadWork> workList;

    /**
     * 服务线程
     * */
    private ConcurrentMap<String,IThreadService> threadServiceConcurrentMap;

    /**
     * 线程运行池
     * */
    private ExecutorService cachedThreadPool;

    /**
     * 后台服务启动时间
     * */
    private long runTime;

    /**
     * 后台服务上次运行时间
     * */
    private long lastCheckTime;


    public BackgroundThreadService(String servicesName){
        this(servicesName,null);
    }

    public BackgroundThreadService(String servicesName,IThreadService service){
        this.runMark = true;
        this.threadServiceConcurrentMap = new ConcurrentHashMap<>();
        this.workList = new LinkedList<>();
        this.backgroundServicesName = StringKit.isEmpty(servicesName)?"DefaultServices":servicesName;
        this.cachedThreadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                60L, TimeUnit.SECONDS,
                new SynchronousQueue<>(),new DefaultThreadFactory());
        if(service!=null) this.addThreadServices(service);
    }

    /**
     * The default thread factory
     */
    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                    Thread.currentThread().getThreadGroup();
            namePrefix = "background-" +
                    poolNumber.getAndIncrement() +
                    "-thread-";
        }

        public Thread newThread(Runnable r) {
            String theadName = namePrefix + threadNumber.getAndIncrement();

            if(r instanceof ThreadWork){
                ThreadWork the = (ThreadWork) r;
                theadName += the.getThreadService().workName();
            }

            Thread t = new Thread(group, r,
                    theadName,
                    0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

    public void run(){
        this.runTime = System.currentTimeMillis();

        logger.info("<后台服务> 后台服务监控线程开始运行");
         // 启动工作空中的线程
        for(IThreadService threadService : threadServiceConcurrentMap.values()) {
            initThreadServices(threadService);
        }
        //运行检查
        monitorWorkStatus();

        //停止运行线程
        this.stopThreadServices();

        logger.info("<后台服务> 后台服务监控线程停止运行");
    }

    private void stopThreadServices(){
        //停止运行线程
        for(ThreadWork threadWork : workList) {
            threadWork.setRunMark(false);
        }
        this.cachedThreadPool.shutdown();
        this.workList.clear();
    }


    /**
     * 检查线程的运行情况
     * */
    private void monitorWorkStatus(){
        //启动监控检查
        Map<String,String> runLog = new HashMap<>();
        while(runMark){
            try {
                this.lastCheckTime = System.currentTimeMillis();
                StringBuilder monitorLog = this.getMonitorLog();
                for(ThreadWork work : workList){
                    this.getLog(monitorLog,work);
                }
                System.out.print(monitorLog+"\r\n");
                Thread.sleep(1000 * 60 * 5);
            } catch (Exception e) {
                logger.info("<后台服务> 后台服务监控线程运行出现异常，异常信息：{}",e);
            }
        }
    }

    /**
     * 初始化线程服务
     * */
    public void initThreadServices(IThreadService threadService){
        for (int i = 0; i < threadService.maxRunCount(); i++) {
            ThreadWork threadWork = new ThreadWork(i,threadService.workName(),threadService, Thread.currentThread().getThreadGroup());
            this.cachedThreadPool.execute(threadWork);
            this.workList.add(threadWork);
        }
    }

    /**
     * 添加运行线程
     * */
    public void addThreadServices(IThreadService threadService){
        this.threadServiceConcurrentMap.put(threadService.workName(),threadService);
    }

    public void addThreadServicesAndStart(IThreadService threadService){
        this.addThreadServices(threadService);
        this.initThreadServices(threadService);
    }

    /**
     * 获取检查日志
     * */
    private StringBuilder getMonitorLog(){
        StringBuilder logString = new StringBuilder();
        logString.append("\r\n\r\n------------------------------------------------\r\n");
        logString.append("  ").append(this.backgroundServicesName).append(" service run log\r\n");
        logString.append("------------------------------------------------\r\n");
        logString.append(" > thread run count   : ").append(workList.size()).append("\r\n");
        logString.append(" > last check time    : ").append(sdf.format(new Date(this.lastCheckTime))).append("\r\n");
        logString.append(" > running time       : ").append((System.currentTimeMillis()-this.runTime)/1000).append("s\r\n");
        logString.append(" > log record time    : ").append(LocalDateTime.now().toString()).append("\r\n");
        logString.append("------------------------------------------------\r\n");
        return logString;
    }

    /**
     * 检录每个线程的检查结果
     * */
    private void getLog(StringBuilder monitorLog,ThreadWork work){
        IThreadService threadService = work.getThreadService();
        monitorLog.append("  ").append(threadService.workName()).append("(").append(work.getRunIndex()).append(") service \r\n");
        monitorLog.append("------------------------------------------------\r\n");
        monitorLog.append(" > last run time      : ").append(sdf.format(new Date(work.getLastRunTime()))).append("\r\n");
        monitorLog.append(" > error work count   : ").append(work.getError()).append("\r\n");
        monitorLog.append("------------------------------------------------\r\n");
    }


    public void runThread(){
        logger.info("<后台服务> 后台服务监控线程开始运行");
        // 启动工作空中的线程
        for(IThreadService threadService : threadServiceConcurrentMap.values()) {
            initThreadServices(threadService);
        }
    }

    public void stopThread(){
        logger.info("<后台服务> 后台服务监控线程停止运行");
        //停止运行线程
        this.stopThreadServices();
    }

    public boolean isRunMark() {
        return runMark;
    }

    public void setRunMark(boolean runMark) {
        this.runMark = runMark;
    }

    public String getServicesName() {
        return backgroundServicesName;
    }

    public void setServicesName(String servicesName) {
        this.backgroundServicesName = servicesName;
    }

    public long getRunTime() {
        return runTime;
    }

    public void setRunTime(long runTime) {
        this.runTime = runTime;
    }


}
